package day03.window;

import beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Flink 流处理 API - Window
 *
 * @author lvbingbing
 * @date 2022-01-01 13:49
 */
public class FlinkWindow00 {

    /**
     * 可执行环境
     */
    private StreamExecutionEnvironment env;

    /**
     * 接收socket文本流
     */
    private DataStreamSource<String> socketTextStream;

    /**
     * sensorReading格式的socket文本流
     */
    private SingleOutputStreamOperator<SensorReading> singleOutputStreamOperator;

    /**
     * 有参构造器会初始化成员变量
     *
     * @param parallelism 并行度
     */
    public FlinkWindow00(int parallelism) {
        init(parallelism);
    }

    public FlinkWindow00() {
    }

    /**
     * 初始化
     *
     * @param parallelism 并行度
     */
    public void init(int parallelism) {
        // 1、创建一个表示当前执行程序的上下文执行环境。如果程序是独立调用的，则此方法返回一个本地执行环境
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        // 2、接收socket文本流
        socketTextStream = env.socketTextStream("hadoop102", 7777);
        singleOutputStreamOperator = socketTextStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
    }

    /**
     * 获取可执行环境
     *
     * @return 可执行环境
     */
    public StreamExecutionEnvironment getEnv() {
        return env;
    }

    /**
     * 获取接收socket文本流
     *
     * @return socket文本流
     */
    public DataStreamSource<String> getSocketTextStream() {
        return socketTextStream;
    }

    /**
     * 获取sensorReading格式的socket文本流
     *
     * @return sensorReading格式的socket文本流
     */
    public SingleOutputStreamOperator<SensorReading> getSingleOutputStreamOperator() {
        return singleOutputStreamOperator;
    }
}
